package com.isyscore.os.etl.config;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.isyscore.boot.login.LoginUserManagerImpl;
import com.isyscore.device.common.model.RespDTO;
import com.isyscore.os.core.entity.Resource;
import com.isyscore.os.core.exception.DataFactoryException;
import com.isyscore.os.core.exception.ErrorCode;
import com.isyscore.os.core.model.dto.LicenseQueryDTO;
import com.isyscore.os.core.model.entity.DataFactoryConfig;
import com.isyscore.os.core.model.entity.JobConfig;
import com.isyscore.os.core.model.entity.JobRunLog;
import com.isyscore.os.core.model.enums.LicenseEnable;
import com.isyscore.os.core.model.enums.LicenseOption;
import com.isyscore.os.core.util.InitiallyUtils;
import com.isyscore.os.etl.constant.CommonConstant;
import com.isyscore.os.etl.manager.QuartzJobManager;
import com.isyscore.os.etl.model.StatusAndLogHandler;
import com.isyscore.os.etl.model.dto.DmcUploadRespDTO;
import com.isyscore.os.etl.model.enums.JobConfigStatus;
import com.isyscore.os.etl.model.enums.JobRunStatus;
import com.isyscore.os.etl.service.DmcService;
import com.isyscore.os.etl.service.JobRunLogService;
import com.isyscore.os.etl.service.LicenseService;
import com.isyscore.os.etl.service.impl.DataFactoryConfigServiceimpl;
import com.isyscore.os.etl.service.impl.JobConfigServiceImpl;
import com.isyscore.os.etl.service.impl.ResourceServiceImpl;
import com.isyscore.os.etl.utils.CronUtils;
import com.isyscore.os.permission.common.constants.PermissionConstants;
import com.isyscore.os.permission.entity.LoginVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.FileInputStream;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static com.isyscore.os.etl.constant.CommonConstant.DEPLOY_MODE_STANDALONE;

/**
 * 任务调度改配置器
 *
 * @author felixu
 * @since 2021.08.11
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class JobConfiguration implements ApplicationRunner {
    @Value("${service.license.check:true}")
    private Boolean enableLicenseCheck;

    public String getCurrentRole() {
        return currentRole;
    }

    private String currentRole = "slave";

    private final JobConfigServiceImpl jobConfigService;


    private final ThreadPoolExecutor statusExecutor;

    private final JobRunLogService jobRunLogService;

    private final LicenseService licenseService;


    private final DataFactoryConfigServiceimpl factoryConfigService;

    @Value("${spring.application.name}")
    private String SERVICE_NAME;

    private final DmcService dmcService;

    private final ResourceServiceImpl resourceService;

    @Value("${service.dmc.file-prefix}")
    private String DMC_BASE_PATH;


    @Value("${flink-config.extend-lib}")
    private String BASE_DIR;

    private final LoginUserManagerImpl loginUserManager;
    @Value("${etl.deploy-mode}")
    private String deployMode = DEPLOY_MODE_STANDALONE;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        licenseCheck();
        if (deployMode.equalsIgnoreCase(DEPLOY_MODE_STANDALONE)) {
            this.switch2Master();
        }
        log.info("当前的服务的角色为：" + currentRole);
    }

    @Autowired
    private QuartzJobManager quartzJobManager;
    @Autowired
    private JobConfiguration jobConfiguration;

    public void initScheduleTask() throws Exception {
        log.debug("系统启动完成，开始预校验所有任务。。。");
        InitiallyUtils.markInitially(() ->
                        jobConfigService.list(new LambdaQueryWrapper<JobConfig>()
                                .eq(JobConfig::getIsScheduled, CommonConstant.TRUE)))
                .forEach(job -> {
                    String cron = job.getCron();
                    if (!CronUtils.isValid(cron)) {
                        throw new DataFactoryException(ErrorCode.CRON_VALID);
                    }
                    try {
                        //将任务租户信息传入quartz，以便其他线程获取租户信息
                        LoginVO loginVO = new LoginVO();
                        loginVO.setTenantId(job.getTenantId());
                        quartzJobManager.addJob(job.getId(), cron, loginVO, true);
                    } catch (Exception e) {
                        log.error("定时任务开启失败{}", e);
                        job.setIsScheduled(CommonConstant.FALSE);
                        jobConfigService.updateById(job);
                    }
                });
        Map<String, String> logIdMappings = new HashMap<>();
        InitiallyUtils.markInitially(() -> jobRunLogService.list(Wrappers.lambdaQuery(JobRunLog.class)
                                .eq(JobRunLog::getJobStatus, JobRunStatus.INITIALIZING.name()))
                        .parallelStream())
                .forEach(runLog -> logIdMappings.putAll(ImmutableMap.of(runLog.getJobConfigId(), runLog.getId())));
        //正在运行的任务,需要重新拉起子线程
        InitiallyUtils.markInitially(() ->
                        jobConfigService
                                .list(Wrappers.lambdaQuery(JobConfig.class)
                                        .eq(JobConfig::getIsScheduled, CommonConstant.FALSE)
                                        .in(JobConfig::getStatus, JobConfigStatus.RUNNING.getCode(), JobConfigStatus.STARTING.getCode())))
                .forEach(job -> {
                    //如果是 starting 状态
                    if (Objects.equals(job.getStatus(), JobConfigStatus.RUNNING.getCode())) {
                        String jobRunLogId = logIdMappings.get(job.getId());
                        //如果log不存在，重新插入一条log记录
                        if (Objects.isNull(jobRunLogId)) {
                            jobRunLogId = jobConfigService.insertJobRunLog(job);
                        }
                        statusExecutor.execute(new StatusAndLogHandler(new StringBuffer(), job.getJobId(), job,
                                jobRunLogId, loginUserManager.getCurrentLoginUser()));
                    } else if (Objects.equals(job.getStatus(), JobConfigStatus.STARTING.getCode())) {
                        //重新上传提交任务
                        jobConfigService.startJobInternal(job, false);
                    }

                });
        initConnector();
    }

    public void initConnector() throws Exception {
        //连接器初始化内置操作
        DataFactoryConfig initFlagConfig = factoryConfigService.getOne(Wrappers.lambdaQuery(new DataFactoryConfig())
                .eq(DataFactoryConfig::getParamName, CommonConstant.INITIALIZED)
                .eq(DataFactoryConfig::getServiceName, SERVICE_NAME));

        //过滤出已经内置过的， 避免重复上传
        Map<String, DataFactoryConfig> initializing = factoryConfigService.list(Wrappers
                        .lambdaQuery(new DataFactoryConfig())
                        .eq(DataFactoryConfig::getParamKey, CommonConstant.INNER_CONNECTOR)
                        .eq(DataFactoryConfig::getServiceName, SERVICE_NAME))
                .parallelStream()
                .filter(el -> CommonConstant.FALSE.equals(Integer.valueOf(el.getParamValue())))
                .collect(Collectors.toMap(el -> el.getParamName(), el -> el));
        List<DataFactoryConfig> dataFactoryConfigsUpdate = new ArrayList<>();
        List<Resource> resourcesAdd = new ArrayList<>();
        List<Resource> resourcesUpdate = new ArrayList<>();
        boolean allSuccess = true;
        if (!Integer.valueOf(initFlagConfig.getParamValue()).equals(CommonConstant.TRUE)) {
            log.info("初始化内置连接器...");
            //初始化内置连接器
            File dir = new File(BASE_DIR);
            if (dir.isDirectory()) {
                File[] files = dir.listFiles();
                //依次上传每个类型的连接器
                for (File f : files) {
                    //忽略目标目录下的子目录
                    if (f.isDirectory()) {
                        continue;
                    } else {
                        String name = f.getName();
                        DataFactoryConfig ele = initializing.get(name);
                        if (ele != null) {
                            log.info("正在上传连接器:{}", name);
                            FileInputStream fis = new FileInputStream(f);
                            MockMultipartFile file = new MockMultipartFile(name, name, "", fis);
                            RespDTO<DmcUploadRespDTO> res = dmcService.upload(file, CommonConstant.OTHER_FILE_TYPE, CommonConstant.MODULE_NAME);
                            if (CommonConstant.DMC_HTTP_SUCCESS != res.getCode()) {
                                log.error("内置连接器{}上传文件错误:{}", f.getName(), res.getMessage());
                                allSuccess = false;
                                //跳过
                            } else {
                                //修改配置状态
                                ele.setParamValue("1");
                                dataFactoryConfigsUpdate.add(ele);
                                Resource resourceOld = InitiallyUtils.markInitially(() -> resourceService.getOne(Wrappers.lambdaQuery(Resource.class).eq(Resource::getName, ele.getRemark()), false));
                                if (ObjectUtil.isNull(resourceOld)) {
                                    //新增一条资源记录
                                    Resource r = new Resource();
                                    r.setFileName(name);
                                    r.setName(CommonConstant.CONNECTOR_MAPPING.get(name));
                                    r.setRemark("内置连接器");
                                    r.setUrl(DMC_BASE_PATH + res.getData().getResPath() + File.separator + res.getData().getResName());
                                    r.setTenantId(PermissionConstants.GLOBAL_BUSINESS_DATA_TENANT_ID_KEY);
                                    //用extra字段中connectType属性存储连接类型
                                    JSONObject extraObject = JSONObject.parseObject(ele.getExtra());
                                    if(ObjectUtil.isNotNull(extraObject)){
                                        r.setConnectType(Integer.valueOf(extraObject.getString("connectType")));
                                    }
                                    resourcesAdd.add(r);
                                } else {
                                    //修改资源记录，只改地址和文件名以及连接类型
                                    //3.1.0只更新connectType不更新其他字段，避免升级后历史任务依赖无法关联
//                                    resourceOld.setFileName(name);
//                                    resourceOld.setUrl(DMC_BASE_PATH + res.getData().getResPath() + File.separator + res.getData().getResName());
                                    //用extra字段中connectType属性存储连接类型
                                    JSONObject extraObject = JSONObject.parseObject(ele.getExtra());
                                    if(ObjectUtil.isNotNull(extraObject)) {
                                        resourceOld.setConnectType(Integer.valueOf(extraObject.getString("connectType")));
                                    }
                                    resourcesUpdate.add(resourceOld);
                                }
                            }
                        }
                    }
                }
                if (allSuccess) {
                    log.info("内置连接器全部上传成功");
                    initFlagConfig.setParamValue("1");
                    dataFactoryConfigsUpdate.add(initFlagConfig);
                }
                factoryConfigService.updateBatchById(dataFactoryConfigsUpdate);
                InitiallyUtils.markInitially(()->resourceService.saveBatch(resourcesAdd));
                InitiallyUtils.markInitially(()->resourceService.updateBatchById(resourcesUpdate));
            }
        }
    }

    public void switch2Master() throws Exception {
        this.currentRole = "master";
        quartzJobManager.stopAllMetricJob();
        jobConfiguration.initScheduleTask();
    }

    public void switch2Slave() {
        this.currentRole = "slave";
        quartzJobManager.stopAllMetricJob();
    }

    public void licenseCheck() {
        boolean check = Boolean.FALSE;
        try {
            LicenseQueryDTO dto= LicenseQueryDTO.builder().ids(Lists.newArrayList(LicenseOption.UDMP_ENABLE.getCode())).build();
            RespDTO<List> respDTO = licenseService.query(dto);
            List licenseResp=respDTO.getData();
            if(CollUtil.isNotEmpty(licenseResp)){
                Integer udmpEnable= (Integer) licenseResp.get(0);
                if(LicenseEnable.ENABLE.getCode().equals(udmpEnable)){
                    log.info("License授权认证成功:UDMP授权状态:{}",udmpEnable);
                    check = true;
                }else {
                    log.error("License授权认证失败:UDMP授权状态:{}",udmpEnable);
                }
            }else {
                log.error("License授权认证失败:授权服务返回信息不正确");
            }
        } catch (Exception e) {
            log.error("License授权认证失败:{}", e.getMessage());
            check = getDefaultCheckStatus();
        }finally {
            loginUserManager.setAppLicenseStatus(check);
        }
    }

    /**
     * 获取默认授权
     * @return 如果开启授权检查，返回FALSE，否则返回TRUE
     */
    private Boolean getDefaultCheckStatus(){
        if(enableLicenseCheck){
            return Boolean.FALSE;
        }else {
            return Boolean.TRUE;
        }
    }
}
